package com.atguigu.flink.sql.connector;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

    insert into 目标表  select * from 源表
        源表： 从流转换
        目标表:  数据最终希望存储在文件系统中

 */
public class Demo6_WriteMysql
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //源表
        Table table = tableEnv.fromDataStream(ds);
        tableEnv.createTemporaryView("t2",table);

        //定义目标表建表语句  写入时，必须定义主键。
        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT ," +
            "   PRIMARY KEY (id) NOT ENFORCED " +
            ")  WITH (" +
            "  'connector' = 'jdbc',   " +
            "  'url' = 'jdbc:mysql://hadoop102:3306/Mybatis?useSSL=false&useUnicode=true&characterEncoding=UTF-8', " +
            "  'table-name' = 'ws'   ," +
            "  'driver' = 'com.mysql.cj.jdbc.Driver'   ," +
            "  'username' = 'root'   ," +
            "  'password' = '000000'   " +
            ")";

        tableEnv.executeSql(createTableSql);

        //执行写出
       tableEnv.executeSql("insert into t1 select * from t2");


    }
}
